package com.lhd.common.utils;

import com.lhd.bean.KeywordBean;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.sql.*;


public class ClickHousehouseSource implements SourceFunction<KeywordBean> {

    Boolean  running=true;


    @Override
    public void run(SourceContext<KeywordBean> ctx) throws Exception {

        Class.forName("ru.yandex.clickhouse.ClickHouseDriver"); //mysqldriver
        Connection connection = DriverManager.getConnection("jdbc:clickhouse://hadoop102:8123/gmall"); //连接url
        //上去了
        PreparedStatement ps =connection.prepareStatement("select  *  from  dws_traffic_source_keyword_page_view_window");
        ResultSet rs = ps.executeQuery();
       while (running) {
           while (rs.next()) {
               ctx.collect(new KeywordBean(rs.getTimestamp(1),rs.getTimestamp(2),
                       rs.getString(3),
                       rs.getString(4) ,
                       rs.getLong(5) ,
                       rs.getLong(6) )
               );
           }
       }
        connection.close();
        cancel();



    }

    @Override
    public void cancel() {
        running=false;
    }
}
